package com.atguigu.gmall.common.rabbit.config;

import com.alibaba.fastjson.JSON;
import com.atguigu.gmall.common.rabbit.model.GmallCorrelationData;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.core.RedisTemplate;

import javax.annotation.PostConstruct;

/**
 * 确保生产者消息可靠性,完成生产者交换机确认回调方法跟队列确认回调方法
 *
 * @author: atguigu
 * @create: 2023-06-26 15:31
 */
@Slf4j
@Configuration
public class MQProducerAckConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private RedisTemplate redisTemplate;


    /**
     * 当前项目启动后自动触发该方法
     */
    @PostConstruct
    public void init() {
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnCallback(this);
    }

    /**
     * 交换机确认方法  生产者发送消息成功返回ack-true 发送失败返回ack-false
     *
     * @param correlationData RabbitMQ 回调传递相关数据,只有发送时候设置了才有值
     * @param ack             正常:ack为true 异常:nack为false
     * @param cause           当ack为false,该变量封装错误原因
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (ack) {
            log.info("交换机确认,发送消息到交换机成功!");
        } else {
            log.error("交换机确认异常,消息发送交换机异常:{},进行消息重发", cause);
            GmallCorrelationData gmallCorrelationData = (GmallCorrelationData) correlationData;
            this.retrySendMsg(gmallCorrelationData);
        }
    }


    /**
     * 队列确认方法-只有交换机路由消息队列失败才会进行回调
     *
     * @param message    返回业务消息对象
     * @param replyCode  应答码
     * @param replyText  错误原因
     * @param exchange   交换机名称
     * @param routingKey 路由键
     */
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        log.error("队列确认失败,消息路由queue失败，应答码={}，原因={}，交换机={}，路由键={}，消息={},进行生产者消息重发",
                replyCode, replyText, exchange, routingKey, message.toString());
        //重试 从消息对象中属性中获取头信息
        String id = message.getMessageProperties().getHeader("spring_returned_message_correlation");
        String key = "mq:" + id;
        String dataStr = (String) redisTemplate.opsForValue().get(key);
        GmallCorrelationData gmallCorrelationData = JSON.parseObject(dataStr, GmallCorrelationData.class);
        this.retrySendMsg(gmallCorrelationData);

    }


    /**
     * 生产者异常后进行消息重发
     *
     * @param gmallCorrelationData
     */
    private void retrySendMsg(GmallCorrelationData gmallCorrelationData) {
        //处理延迟插件方式1:必然触发队列异常确认
        if(gmallCorrelationData.isDelay()){
            return;
        }
        //判断重试次数
        if (gmallCorrelationData.getRetryCount() >= 3) {
            log.error("发送业务数据重试已达上限:{}", gmallCorrelationData);
            return;
        }
        gmallCorrelationData.setRetryCount(gmallCorrelationData.getRetryCount() + 1);
        String key = "mq:"+gmallCorrelationData.getId();
        redisTemplate.opsForValue().set(key, JSON.toJSONString(gmallCorrelationData));
        if(gmallCorrelationData.isDelay()){
            //采用延迟 消息方法方式 设置延迟消息 延迟时间
            rabbitTemplate.convertAndSend(gmallCorrelationData.getExchange(), gmallCorrelationData.getRoutingKey(), gmallCorrelationData.getMessage(), (message) -> {
                message.getMessageProperties().setDelay(gmallCorrelationData.getDelayTime() * 1000);
                return message;
            }, gmallCorrelationData);
        }else{
            //3.调用模板对象进行消息发送-普通消息重发
            rabbitTemplate.convertAndSend(gmallCorrelationData.getExchange(), gmallCorrelationData.getRoutingKey(), gmallCorrelationData.getMessage(), gmallCorrelationData);

        }
    }
}
